package com.atguigu.flink.chapter10;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class UpsertKafkaConnectorDemo_2 {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",3000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);
        env.enableCheckpointing(3000);
        env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        tEnv.executeSql("create table s2(" +
                "id string," +
                "vc_sum int," +
                "primary key(id) not enforced" +
                ") with(" +
                " 'connector' = 'upsert-kafka', " +
                " 'topic' = 's2_upsert', " +
                " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
                " 'key.format' = 'json', " +
                " 'value.format' = 'json' " +
                ")");

        tEnv.sqlQuery("select * from s2").execute().print();
    }
}



//public class UpsertKafkaConnectorDemo_2 {
//    public static void main(String[] args) {
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",2000);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//        env.setParallelism(1);
//        env.enableCheckpointing(3000);
//        env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//
//        tEnv.executeSql("create table s2(" +
//                "id string," +
//                "vc_sum int," +
//                "primary key(id) not enforced" +
//                ") with(" +
//                " 'connector' = 'upsert-kafka', " +
//                " 'topic' = 's2_upsert', " +
//                " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                " 'key.format' = 'json', " +
//                " 'value.format' = 'json' " +
//                ")");
//
//        tEnv.sqlQuery("select * from s2").execute().print();
//    }
//}


//public class UpsertKafkaConnectorDemo_2 {
//    public static void main(String[] args) {
//        Configuration conf = new Configuration();
//        conf.setInteger("rest.port",2000);
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
//        env.setParallelism(1);
//        env.enableCheckpointing(3000);
//        env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//
//        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
//
//        tEnv.executeSql("create table s2(" +
//                "id string," +
//                "vc_sum int," +
//                "primary key(id) not enforced" +
//                ") with(" +
//                " 'connector' = 'upsert-kafka', " +
//                " 'topic' = 's2_upsert', " +
//                " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
//                " 'key.format' = 'json', " +
//                " 'value.format' = 'json' " +
//                ")");
//
//
//        tEnv.sqlQuery("select * from s2").execute().print();
//    }
//}